/*
 * Copyright 2016 LinkedIn Corp.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

package com.linkedin.drelephant.util;

import com.linkedin.drelephant.analysis.HadoopApplicationData;
import com.linkedin.drelephant.configurations.scheduler.SchedulerConfiguration;
import com.linkedin.drelephant.configurations.scheduler.SchedulerConfigurationData;
import com.linkedin.drelephant.schedulers.Scheduler;

import java.lang.reflect.InvocationTargetException;
import java.util.List;
import java.util.Properties;

import org.apache.log4j.Logger;
import org.hit.monitor.model.YarnAppResultDO;
import org.w3c.dom.Document;

import com.linkedin.drelephant.mapreduce.data.MapReduceApplicationData;


/**
 * InfoExtractor is responsible for retrieving information and context about a
 * job from the job's configuration
 */
public class InfoExtractor {
  private static final Logger logger = Logger.getLogger(InfoExtractor.class);
  private static final String SPARK_EXTRA_JAVA_OPTIONS = "spark.driver.extraJavaOptions";

  private static final String SCHEDULER_CONF = "SchedulerConf.xml";

  private static final List<SchedulerConfigurationData> _configuredSchedulers;

  /**
   * Load all the schedulers configured in SchedulerConf.xml
   */
  static {
    Document document = Utils.loadXMLDoc(SCHEDULER_CONF);
    _configuredSchedulers = new SchedulerConfiguration(document.getDocumentElement()).getSchedulerConfigurationData();
    for (SchedulerConfigurationData data : _configuredSchedulers) {
      logger.info(String.format("Load Scheduler %s with class : %s", data.getSchedulerName(), data.getClassName()));
    }
  }

  /**
   * Find the scheduler which scheduled the job.
   *
   * @param appId The application id
   * @param properties The application properties
   * @return the corresponding Scheduler which scheduled the job.
   */
  public static Scheduler getSchedulerInstance(String appId, Properties properties) {
    if (properties != null) {
      for (SchedulerConfigurationData data : _configuredSchedulers) {
        try {
          Class<?> schedulerClass = Class.forName(data.getClassName());
          Object instance = schedulerClass.getConstructor(String.class, Properties.class, SchedulerConfigurationData.class).newInstance(appId, properties, data);
          if (!(instance instanceof Scheduler)) {
            throw new IllegalArgumentException(
                    "Class " + schedulerClass.getName() + " is not an implementation of " + Scheduler.class.getName());
          }
          Scheduler scheduler = (Scheduler)instance;
          if (!scheduler.isEmpty()) {
            return scheduler;
          }
        } catch (ClassNotFoundException e) {
          throw new RuntimeException("Could not find class " + data.getClassName(), e);
        } catch (InstantiationException e) {
          throw new RuntimeException("Could not instantiate class " + data.getClassName(), e);
        } catch (IllegalAccessException e) {
          throw new RuntimeException("Could not access constructor for class" + data.getClassName(), e);
        } catch (RuntimeException e) {
          throw new RuntimeException(data.getClassName() + " is not a valid Scheduler class.", e);
        } catch (InvocationTargetException e) {
          throw new RuntimeException("Could not invoke class " + data.getClassName(), e);
        } catch (NoSuchMethodException e) {
          throw new RuntimeException("Could not find constructor for class " + data.getClassName(), e);
        }
      }
    }
    return null;
  }

  /**
   * Loads result with the info depending on the application type
   *
   * @param result The jobResult to be loaded with.
   * @param data The Hadoop application data
   */
  public static void loadInfo(YarnAppResultDO result, HadoopApplicationData data) {
    Properties properties = new Properties();
    if (data instanceof MapReduceApplicationData) {
      properties = retrieveMapreduceProperties((MapReduceApplicationData) data);
    }

    Scheduler scheduler = getSchedulerInstance(data.getAppId(), properties);

    if (scheduler != null) {
      String appId = data.getAppId();

      // Load all the Ids
      result.setJobDefId(Utils.truncateField(scheduler.getJobDefId(), YarnAppResultDO.URL_LEN_LIMIT, appId));
      result.setJobExecId(Utils.truncateField(scheduler.getJobExecId(), YarnAppResultDO.URL_LEN_LIMIT, appId));
      result.setFlowDefId(Utils.truncateField(scheduler.getFlowDefId(), YarnAppResultDO.URL_LEN_LIMIT, appId));
      result.setFlowExecId(Utils.truncateField(scheduler.getFlowExecId(), YarnAppResultDO.FLOW_EXEC_ID_LIMIT, appId));

      // Dr. Elephant expects all the 4 ids(jobDefId, jobExecId, flowDefId, flowExecId) to be set.
      if (!Utils.isSet(result.getJobDefId()) || !Utils.isSet(result.getJobExecId())
          || !Utils.isSet(result.getFlowDefId()) || !Utils.isSet(result.getJobExecId())) {
        logger.warn("This job doesn't have the correct " + scheduler.getSchedulerName() + " integration support. I"
            + " will treat this as an adhoc job");
        loadNoSchedulerInfo(result);
      } else {
        result.setScheduler(Utils.truncateField(scheduler.getSchedulerName(), YarnAppResultDO.SCHEDULER_LIMIT, appId));
        result.setWorkflowDepth(scheduler.getWorkflowDepth());
        result.setJobName(scheduler.getJobName() != null ?
            Utils.truncateField(scheduler.getJobName(), YarnAppResultDO.JOB_NAME_LIMIT, appId) : "");
        result.setJobDefUrl(scheduler.getJobDefUrl() != null ?
            Utils.truncateField(scheduler.getJobDefUrl(), YarnAppResultDO.URL_LEN_LIMIT, appId) : "");
        result.setJobExecUrl(scheduler.getJobExecUrl() != null ?
            Utils.truncateField(scheduler.getJobExecUrl(), YarnAppResultDO.URL_LEN_LIMIT, appId) : "");
        result.setFlowDefUrl(scheduler.getFlowDefUrl() != null ?
            Utils.truncateField(scheduler.getFlowDefUrl(), YarnAppResultDO.URL_LEN_LIMIT, appId) : "");
        result.setFlowExecUrl(scheduler.getFlowExecUrl() != null ?
            Utils.truncateField(scheduler.getFlowExecUrl(), YarnAppResultDO.URL_LEN_LIMIT, appId) : "");
      }
    } else {
      loadNoSchedulerInfo(result);
    }
  }

  /**
   * Retrieve the Mapreduce properties
   *
   * @param appData the Mapreduce Application Data
   * @return the retrieved mapreduce properties
   */
  public static Properties retrieveMapreduceProperties(MapReduceApplicationData appData) {
    return appData.getConf();
  }

  /**
   * A temporary solution that SPARK 1.2 need to escape '&' with '\&' in its javaOptions.
   * This is the reverse process that recovers the escaped string.
   *
   * @param s The string to unescape
   * @return The original string
   */
  private static String unescapeString(String s) {
    if (s == null) {
      return null;
    }
    return s.replaceAll("\\\\\\&", "\\&");
  }

  /**
   * Update the application result with adhoc(not scheduled by a scheduler) information
   *
   * @param result The AppResult to be udpated
   */
  private static void loadNoSchedulerInfo(YarnAppResultDO result) {
    result.setScheduler(null);
    result.setWorkflowDepth(0);
    result.setJobExecId("");
    result.setJobDefId("");
    result.setFlowExecId("");
    result.setFlowDefId("");
    result.setJobExecUrl("");
    result.setJobDefUrl("");
    result.setFlowExecUrl("");
    result.setFlowDefUrl("");
    result.setJobName("");
  }
}
